---
title: 'Kafka'
description: 'Configuring the Kafka backend (Self-Hosted)'
sidebarTitle: 'Kafka'
icon: 'signal-stream'
---

### Getting Started

<img src="https://d15jtxgb40qetw.cloudfront.net/kafka.svg" alt="kafka" className="not-prose h-20" />

{/*
    Add custom content here (under this comment)...

    e.g.

    **Creating Account**<br />
    Go to the **[🔗 website](https://odigos.io) > Account** and click **Sign Up**

    **Obtaining Access Token**<br />
    Go to **⚙️ > Access Tokens** and click **Create New**

    !! Do not remove this comment, this acts as a key indicator in `docs/sync-dest-doc.py` !!
    !! START CUSTOM EDIT !!
*/}

{/*
    !! Do not remove this comment, this acts as a key indicator in `docs/sync-dest-doc.py` !!
    !! END CUSTOM EDIT !!
*/}

### Configuring Destination Fields

<Accordion title="Supported Signals:">
  ✅ Traces
  ✅ Metrics
  ✅ Logs
</Accordion>

- **KAFKA_PROTOCOL_VERSION** `string` : Protocol Version. Kafka protocol version.
  - This field is required
  - Example: `2.0.0`
- **KAFKA_BROKERS** `string[]` : Brokers. The list of kafka brokers
  - This field is optional and defaults to `["localhost:9092"]`
- **KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY** `boolean` : Resolve Canonical Bootstrap Servers Only. Whether to resolve then reverse-lookup broker IPs during startup.
  - This field is optional and defaults to `False`
- **KAFKA_CLIENT_ID** `string` : Client ID. The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
  - This field is optional and defaults to `sarama`
- **KAFKA_TOPIC** `string` : Topic. The name of the default kafka topic to export to (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs).
  - This field is optional
- **KAFKA_TOPIC_FROM_ATTRIBUTE** `string` : Topic from Attribute. Specify the resource attribute whose value should be used as the message's topic.
  - This field is optional
- **KAFKA_ENCODING** `string` : Encoding. The encoding of the traces sent to kafka.
  - This field is optional and defaults to `otlp_proto`
- **KAFKA_PARTITION_TRACES_BY_ID** `boolean` : Partition Traces by ID. Configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
  - This field is optional and defaults to `False`
- **KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES** `boolean` : Partition Metrics by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
  - This field is optional and defaults to `False`
- **KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES** `boolean` : Partition Logs by Resource Attributes. Configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
  - This field is optional and defaults to `False`
- **KAFKA_AUTH_METHOD** `string` : Auth Method. The auth method to use.
  - This field is required and defaults to `none`
- **KAFKA_USERNAME** `string` : Username. The username to use.
  - This field is optional
- **KAFKA_PASSWORD** `string` : Password. The password to use.
  - This field is optional
- **KAFKA_METADATA_FULL** `boolean` : Metadata Full. Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
  - This field is optional and defaults to `False`
- **KAFKA_METADATA_MAX_RETRY** `string` : Metadata Max Retry. The number of retries to get metadata.
  - This field is optional and defaults to `3`
- **KAFKA_METADATA_BACKOFF_RETRY** `string` : Metadata Backoff Retry. How long to wait between metadata retries.
  - This field is optional and defaults to `250ms`
- **KAFKA_TIMEOUT** `string` : Timeout. Is the timeout for every attempt to send data to the backend.
  - This field is optional and defaults to `5s`
- **KAFKA_RETRY_ON_FAILURE_ENABLED** `boolean` : Enable Retry on Failure.
  - This field is optional and defaults to `True`
- **KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL** `string` : Initial Interval. Time to wait after the first failure before retrying; ignored if `enabled` is `false`.
  - This field is optional and defaults to `5s`
- **KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL** `string` : Max Interval. Is the upper bound on backoff; ignored if `enabled` is `false`.
  - This field is optional and defaults to `30s`
- **KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME** `string` : Max Elapsed Time. Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`.
  - This field is optional and defaults to `120s`
- **KAFKA_PRODUCER_MAX_MESSAGE_BYTES** `string` : Producer Max Message Bytes. The maximum permitted size of a message in bytes.
  - This field is optional and defaults to `1000000`
- **KAFKA_PRODUCER_REQUIRED_ACKS** `string` : Producer Required Acks. Controls when a message is regarded as transmitted.
  - This field is optional and defaults to `1`
- **KAFKA_PRODUCER_COMPRESSION** `string` : Producer Compression. The compression used when producing messages to kafka.
  - This field is optional and defaults to `none`
- **KAFKA_PRODUCER_FLUSH_MAX_MESSAGES** `string` : Producer Flush Max Messages. The maximum number of messages the producer will send in a single broker request.
  - This field is optional and defaults to `0`

<Note>
  The destination topic can be defined in a few different ways and takes priority in the following order:

  1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
  2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the [github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/kafka/topic) package), the value set in the context is used.
  3. Finally, the topic configuration is used as a default/fallback destination.
</Note>

### Adding Destination to Odigos

There are two primary methods for configuring destinations in Odigos:

##### **Using the UI**

<Steps>
  <Step>
    Use the [Odigos CLI](https://docs.odigos.io/cli/odigos_ui) to access the UI
    ```bash
    odigos ui
    ```
  </Step>
  <Step>
    Click on `Add Destination`, select `Kafka` and follow the on-screen instructions
  </Step>
</Steps>

##### **Using Kubernetes manifests**

<Steps>
  <Step>
    Save the YAML below to a file (e.g. `kafka.yaml`)
    ```yaml
    apiVersion: odigos.io/v1alpha1
    kind: Destination
    metadata:
      name: kafka-example
      namespace: odigos-system
    spec:
      data:
        KAFKA_AUTH_METHOD: '<Auth Method (default: none) (options: [none, plain_text])>'
        KAFKA_PROTOCOL_VERSION: <Protocol Version>
        # Note: The commented fields below are optional.
        # KAFKA_BROKERS: <Brokers (default: ["localhost:9092"])>
        # KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: <Resolve Canonical Bootstrap Servers Only>
        # KAFKA_CLIENT_ID: <Client ID (default: sarama)>
        # KAFKA_TOPIC: <Topic>
        # KAFKA_TOPIC_FROM_ATTRIBUTE: <Topic from Attribute>
        # KAFKA_ENCODING: <Encoding (default: otlp_proto) (options: [otlp_proto, otlp_json])>
        # KAFKA_PARTITION_TRACES_BY_ID: <Partition Traces by ID>
        # KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES: <Partition Metrics by Resource Attributes>
        # KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES: <Partition Logs by Resource Attributes>
        # KAFKA_USERNAME: <Username>
        # KAFKA_METADATA_FULL: <Metadata Full>
        # KAFKA_METADATA_MAX_RETRY: <Metadata Max Retry (default: 3)>
        # KAFKA_METADATA_BACKOFF_RETRY: <Metadata Backoff Retry (default: 250ms)>
        # KAFKA_TIMEOUT: <Timeout (default: 5s)>
        # KAFKA_RETRY_ON_FAILURE_ENABLED: <Enable Retry on Failure (default: True)>
        # KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL: <Initial Interval (default: 5s)>
        # KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL: <Max Interval (default: 30s)>
        # KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME: <Max Elapsed Time (default: 120s)>
        # KAFKA_PRODUCER_MAX_MESSAGE_BYTES: <Producer Max Message Bytes (default: 1000000)>
        # KAFKA_PRODUCER_REQUIRED_ACKS: <Producer Required Acks (default: 1)>
        # KAFKA_PRODUCER_COMPRESSION: <Producer Compression (default: none) (options: [none, gzip, snappy, lz4, zstd])>
        # KAFKA_PRODUCER_FLUSH_MAX_MESSAGES: <Producer Flush Max Messages (default: 0)>
      destinationName: kafka
      # Uncomment the 'secretRef' below if you are using the optional Secret.
      # secretRef:
      #   name: kafka-secret
      signals:
      - TRACES
      - METRICS
      - LOGS
      type: kafka

    ---

    # The following Secret is optional. Uncomment the entire block if you need to use it.
    # apiVersion: v1
    # data:
    #   KAFKA_PASSWORD: <Base64 Password>
    # kind: Secret
    # metadata:
    #   name: kafka-secret
    #   namespace: odigos-system
    # type: Opaque
    ```
  </Step>
  <Step>
    Apply the YAML using `kubectl`
    ```bash
    kubectl apply -f kafka.yaml
    ```
  </Step>
</Steps>